-
Notifications
You must be signed in to change notification settings - Fork 932
Add Range-based Multipart download Subscriber for Pre-signed URLs #6331
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: feature/master/pre-signed-url-getobject
Are you sure you want to change the base?
Add Range-based Multipart download Subscriber for Pre-signed URLs #6331
Conversation
.../amazon/awssdk/services/s3/internal/multipart/PresignedUrlMultipartDownloaderSubscriber.java
Outdated
Show resolved
Hide resolved
.../amazon/awssdk/services/s3/internal/multipart/PresignedUrlMultipartDownloaderSubscriber.java
Outdated
Show resolved
Hide resolved
.../amazon/awssdk/services/s3/internal/multipart/PresignedUrlMultipartDownloaderSubscriber.java
Outdated
Show resolved
Hide resolved
...ware/amazon/awssdk/services/s3/internal/multipart/PresignedUrlMultipartDownloadTestUtil.java
Outdated
Show resolved
Hide resolved
|
||
@Override | ||
public void onSubscribe(Subscription s) { | ||
synchronized (lock) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need synchronized here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah right, this was defensive programming, we don't need synchronization here since Reactive Streams spec guarantees onSubscribe is called at most once per Subscriber.
private long parseContentRangeForTotalSize(String contentRange) { | ||
Matcher matcher = CONTENT_RANGE_PATTERN.matcher(contentRange); | ||
if (!matcher.matches()) { | ||
throw new IllegalArgumentException("Invalid Content-Range header: " + contentRange); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here
Did we handle the exception elsewhere? because this may cause the request to get stuck, we probably wanted to make sure we handle it elegantly, cancelling subscription, failing the future exceptionally etc
} | ||
} | ||
|
||
private long parseContentRangeForTotalSize(String contentRange) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we move this class to a util or something and add some unit tests?
future.complete(null); | ||
} | ||
|
||
public CompletableFuture<Void> future() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this used anywhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
future() method is used for testing to verify subscriber completion/error states
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, can we add SdkTestInternalApi annotation?
import software.amazon.awssdk.services.s3.utils.AsyncResponseTransformerTestSupplier; | ||
|
||
@WireMockTest | ||
class PresignedUrlMultipartDownloaderSubscriberWiremockTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of testing PresignedUrlMultipartDownloaderSubscriber, can we add some tests using S3AsyncClient directly? Example: https://github.com/aws/aws-sdk-java-v2/blob/master/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/S3MultipartClientPutObjectWiremockTest.java
this.totalContentLength = parseContentRangeForTotalSize(response.contentRange()); | ||
this.totalParts = calculateTotalParts(totalContentLength, configuredPartSizeInBytes); | ||
log.debug(() -> String.format("Total content length: %d, Total parts: %d", totalContentLength, totalParts)); | ||
} catch (Exception e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using try catch as control flow it is a bit smell. Suggesting https://github.com/aws/aws-sdk-java-v2/blob/master/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/KnownContentLengthAsyncRequestBodySubscriber.java#L193
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactored to use Optional<SdkClientException>
validation pattern like KnownContentLengthAsyncRequestBodySubscriber
instead of try-catch for control flow.
return; | ||
} | ||
} | ||
if (totalParts != null && totalParts > 1 && totalComplete < totalParts) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggesting creating a method to make it more clear, maybe hasMoreParts()?
} | ||
} | ||
|
||
private void validateResponse(GetObjectResponse response) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add all validations specified in the transfer manager SEP?
|
if (eTag == null) { | ||
this.eTag = response.eTag(); | ||
log.debug(() -> String.format("Multipart object ETag: %s", this.eTag)); | ||
} else if (response.eTag() != null && !eTag.equals(response.eTag())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a test case for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed ETag validation, now we capture the ETag from the first part and use it for If-Match header validation on subsequent parts which aligns with the SEP requirements.
return (int) Math.ceil((double) contentLength / partSize); | ||
} | ||
|
||
private PresignedUrlDownloadRequest createPartRequest(int partIndex) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: suggesting createRangedGetRequest since this it not part request
.endpointOverride(URI.create("http://localhost:" + wiremock.getHttpPort())) | ||
.serviceConfiguration(S3Configuration.builder() | ||
.pathStyleAccessEnabled(true) | ||
.checksumValidationEnabled(false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we disabling checksum validation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Range requests don't have checksums to validate, so this setting has no effect but false would be more explicit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems a bit odd though since it's enabled by default and we are testing with it off explicitly. Can we enable checksum mode by default (I think we should if we can)
int totalComplete = completedParts.get(); | ||
log.debug(() -> String.format("Completed part %d", totalComplete)); | ||
|
||
synchronized (lock) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to lock the entire block?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can reduce the sync lock scope and make totalContentLength, totalParts, and eTag volatile.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR introduces a new implementation for multipart downloads using presigned URLs with HTTP range headers instead of S3's native partNumber-based multipart API. The implementation uses reactive streams to handle concurrent part downloads while maintaining thread safety and proper response validation.
- Adds
PresignedUrlMultipartDownloaderSubscriber
class for range-based multipart downloads with presigned URLs - Implements size discovery through initial range requests and ETag validation for response consistency
- Provides comprehensive test coverage including TCK compliance tests and WireMock integration tests
Reviewed Changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated no comments.
File | Description |
---|---|
PresignedUrlMultipartDownloaderSubscriber.java | Core subscriber implementation for presigned URL multipart downloads using HTTP range requests |
PresignedUrlMultipartDownloaderSubscriberTckTest.java | Reactive Streams TCK compliance test with mocked S3AsyncClient |
PresignedUrlMultipartDownloaderSubscriberWiremockTest.java | Integration tests using WireMock for various download scenarios |
PresignedUrlMultipartDownloadTestUtil.java | Test utility class for WireMock stubbing and verification |
Comments suppressed due to low confidence (2)
services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/PresignedUrlMultipartDownloadTestUtil.java:138
- The XML tag '' appears to be incomplete or truncated. It should likely be '' to match the pattern used in line 125.
.withBody("<e><Code>400</Code><Message>test error message</Message></e>")));
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
You can also share your feedback on Copilot code review for a chance to win a $100 gift card. Take the survey.
e9d282d
to
c12ea47
Compare
Motivation and Context
This PR introduces support for multipart downloads using presigned URLs. Unlike the existing
MultipartDownloaderSubscriber
which relies on partNumber, this implementation uses HTTP range headers to achieve multipart functionality with pre-signed URLs.Modifications
PresignedUrlMultipartDownloaderSubscriber
: New reactive subscriber that handles size discovery through initial range requests, manages multipart state with thread-safe operations, and validates response consistency using ETag comparison.Testing
PresignedUrlMultipartDownloaderSubscriberTckTest
: Reactive Streams specification compliance test usingSubscriberWhiteboxVerification
with mocked S3AsyncClient for isolated testing.PresignedUrlMultipartDownloaderSubscriberWiremockTest
: WireMock tests covering multipart downloads with various part sizes, single-part downloads, and error scenarios.PresignedUrlMultipartDownloadTestUtil
: WireMock stubbing utilities for range request simulation and verification.Screenshots (if appropriate)
Types of changes
Checklist
mvn install
succeedsscripts/new-change
script and following the instructions. Commit the new file created by the script in.changes/next-release
with your changes.License